package cn.doitedu.dw.sql

import java.util.Calendar

import cn.doitedu.commons.util.{SparkUtil, YieeDateUtils}
import org.apache.commons.lang.time.{DateFormatUtils, DateUtils}
import org.apache.spark.sql.{Dataset, SparkSession}

/**
  * @date: 2020/1/19
  * @site: www.doitedu.cn
  * @author: hunter.d 涛哥
  * @qq: 657270652
  * @description: 活跃用户留存分析
  *               假设当前日期是2020-06-09号
  *
  *               验证本程序的前提准备：
  *  1. hive中要有一个demo版的活跃区间记录表
  *               a,2020-05-20,2020-05-20,2020-05-26
  *               a,2020-05-20,2020-05-29,2020-06-01
  *               a,2020-05-20,2020-06-03,9999-12-31
  *               b,2020-05-22,2020-05-22,2020-05-30
  *               b,2020-05-22,2020-06-03,2020-06-05
  *               c,2020-06-03,2020-06-03,2020-06-08
  *               d,2020-06-09,2020-06-09,9999-12-31
  *               t,2020-05-22,2020-06-03,2020-06-06
  *               t,2020-05-22,2020-05-22,2020-05-30
  *               x,2020-06-04,2020-06-04,2020-06-05
  *               b,2020-05-22,2020-06-09,9999-12-31
  *               t,2020-05-22,2020-06-09,9999-12-31
  *  2.hive中要有一个日活统计报表
  *               create table demo_ads_apl_dau_cube(dt string,dau_cnts int)
  *               row format delimited fields terminated by ',';
  *               测试数据所在位置： data/art/demo_dau/demo_dau.txt
  *
  *
  */
object ActiveUserRetention {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local")
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    /*
     // 读取本地测试数据
     val ds = spark.read.textFile("data/art/input")
     ds.show(10, false)
       // 转化成dataframe
     val df = ds.map(line => {
       val arr = line.split(",")
       arr
     })
       .filter(_.size == 4)
       .map(arr => (arr(0), arr(1), arr(2), arr(3)))
       .toDF("guid", "first_dt", "rng_start", "rng_end")
     */

    // 读hive中的连续活跃区间记录表
    val df = spark.sql(
      """
        |select * from doit12.demo_dws_apl_uca_rng
        |
      """.stripMargin)

    // 找出当天活跃的人的区间记录（找那种最后区间是9999的人）
    // 方案1： join
    /*
        val start = System.currentTimeMillis()
        val dau = df.where("rng_end='9999-12-31'")
          .select("guid")

        dau.join(df, "guid")
          .show(20, false)

        /**
          * +----+----------+----------+----------+
          * |guid|first_dt  |rng_start |rng_end   |
          * +----+----------+----------+----------+
          * |a   |2020-05-20|2020-05-20|2020-05-26|
          * |a   |2020-05-20|2020-05-29|2020-06-01|
          * |a   |2020-05-20|2020-06-03|9999-12-31|
          * |b   |2020-05-22|2020-05-22|2020-05-30|
          * |b   |2020-05-22|2020-06-03|2020-06-05|
          * |d   |2020-06-09|2020-06-09|9999-12-31|
          * |t   |2020-05-22|2020-06-03|2020-06-06|
          * |t   |2020-05-22|2020-05-22|2020-05-30|
          * |b   |2020-05-22|2020-06-09|9999-12-31|
          * |t   |2020-05-22|2020-06-09|9999-12-31|
          * +----+----------+----------+----------+
          */
        val end = System.currentTimeMillis()

        println(end - start) // 1199  1197
    */

    /*

        // 方案2： 广播变量
        val start = System.currentTimeMillis()
        val dau2 = df.where("rng_end='9999-12-31'")
          .select("guid")

        val set = dau2.rdd.map(row => row.getAs[String]("guid")).collect().toSet
        val bc = spark.sparkContext.broadcast(set)
        df.rdd.filter(row => {

          val set = bc.value

          val guid = row.getAs[String]("guid")
          // val first_dt = row.getAs[String]("first_dt")
          // val rng_start = row.getAs[String]("rng_start")
          // val rng_end = row.getAs[String]("rng_end")

          set.contains(guid)
        })
          .map(row => {

            val guid = row.getAs[String]("guid")
            val first_dt = row.getAs[String]("first_dt")
            val rng_start = row.getAs[String]("rng_start")
            val rng_end = row.getAs[String]("rng_end")
            (guid, first_dt, rng_start, rng_end)
          })
          .toDF("guid", "first_dt", "rng_start", "rng_end")
          .show(50, false)
        val end = System.currentTimeMillis()
        println(end - start) //  1204

    */

    // 方案3： 用last_value() over()
    val start = System.currentTimeMillis()
    val dau = df.selectExpr("guid", "rng_start", "rng_end", "last_value(rng_end) over(partition by guid order by rng_end rows between unbounded preceding and unbounded following) as flag")
      .where("flag='9999-12-31'")
    val end = System.currentTimeMillis()
    println(end - start) //  3086


    // 按活跃区间（2020-05-22,2020-05-30），生成  kv :   留存x日 -> 1
    val kv = dau.flatMap(row => {
      val guid = row.getAs[String]("guid")
      val rng_start = row.getAs[String]("rng_start")
      val rng_end = row.getAs[String]("rng_end")

      // 5.22 - 5.26
      // 4
      val days = YieeDateUtils.dateDiff(rng_start, if (rng_end == "9999-12-31") "2020-06-09" else rng_end)
      // 6
      val diff = YieeDateUtils.dateDiff(rng_start, "2020-06-09")
      for (i <- 0L to days) yield (YieeDateUtils.dateAdd(rng_start, i.toInt), diff - i, 1)
    }).toDF("dt", "art_days", "art_cnts")


    // 按不同的留存天数，分组求人数
    // 日期，留存天数，留存人数
    kv.groupBy("dt", "art_days").count().createTempView("kv")

    // 再关联上每个日期的活跃总数

    // 日期,活跃总数,留存天数,留存人数
    val res = spark.sql(
      """
        |
        |select
        |kv.dt,        -- 日期
        |b.dau_cnts,   -- 活跃总数
        |kv.art_days,  -- 留存天数
        |kv.`count`   -- 留存人数
        |from kv join doit12.demo_ads_apl_dau_cube b on kv.dt=b.dt
        |
        |
      """.stripMargin)


    // 将结果写入hive
    res.write.saveAsTable("doit12.demo_art_rpt")

    spark.close()


    println("hello world 一切正常")

    for(i <- 1 to 10 ){
      println(i)
    }

  }


}
